分布式系统Leader-Election实践

最近在实现基于golang的基础代码框架,正好实现了一遍leader选举的代码;研究了各种方式选主的实现,简单记录一下。

两种思路

刷存在感、论资排辈

以前搞网络协议的时候,绝大多数的选主流程都是依靠协议的节点相互之间发送:“请求->应答->确认”等报文来通告朋友圈自己的信息,耍存在感;协议定义了朋友圈儿中论资排辈的规则,比如先看MAC地址、再判断用户配置的优先级等等。

最终所有的节点信息达成同步,按照老祖宗定下的规矩,老大必须是xx,大家就安心干活,总有一天老大挂了,媳妇儿才能熬成婆。通过定期刷朋友圈儿(心跳报文)来保障当老大挂了之后能够立刻召开大会,进行下一轮选举、快速倒换业务。电信级的OAM甚至对业务的倒换时间都有毫秒级的要求。

所以,一般协议的RFC都包含了一套选主、论资排辈以及倒换业务的流程,只需要按照该流程实现就行了。

但是,这样的实现一般需要一套组播发现机制来保障所有参与选举的节点都能够加入进来;或者通过手动配置的方式来实现。这样的实现,不依赖第三发,开发成本和复杂度都相对较大。在分布式中,常见的如raft协议等。

马仔抢帽子

互联网的业务可以依赖一些外部中间件,而中间件的高可用平台已提供了保障。因此,基于中间件来实现选主是一个更加低成本和高效的选择。借助于中间件,选主本质就变为一个抢锁的过程。

简单整理一下思路:

我们将leader角色看作写着老大字样的帽子,所有参与选主的节点就是参与抢帽子的马仔们。

  1. 一开始所有人都是马仔,然后开始抢帽子,谁先抢到锁(老大的帽子),谁就是这一个时间片的老大;
  2. 抢到帽子的老大拥有一定的特权,在这一个时间片结束前,老大如果还想继续扮演老大的角色,就自个儿去为自己续职;
  3. 没抢到老大帽子的马仔们,会不甘心当一辈子马仔,所以一旦有老大掉了老大帽子的消息就飞速的去争抢。

通过这样一个简单的抢帽子比赛,组织内部就维持了某一时间点只有一个老大的状态。这个简单的比喻,我们大概知道了选主的流程。也知道了这种选主方式与先前讲的多个成员定期耍朋友圈,刷存在感,再论资排辈之间在架构上的差异是引入了中间件。

那么哪些中间件才能用来做这个事情呢?这里又涉及到两种情况。

  1. 马仔们高度自觉,发现帽子已经被人拥有后,不会起歹念;这种情况中间件只用来存放时间状态。
  2. 马仔们素质较低,一有机会就试图抢走帽子;这种情况下中间件必须将帽子放入笼子,保障在定时器未到期时,帽子没法被马仔们抢走。

第一种情况就是k8s对leader election的实现方式。而我基于consul实现的是对第二种方式。

实现分析

K8s选主

K8s的controller-manger和scheduler是有状态的,它的HA是A+P模式。通过选主,只在主节点上运行业务,其他从节点处于待命状态。抢到锁的节点会将自己的标记(目前是hostname)设为锁的持有者,其他人则需要通过对比锁的更新时间和持有者来判断自己是否能成为新的 leader ,而 leader 则可以通过更新 RenewTime 来确保持续保有该锁。

k8s并没有使用中间件,但是本质上是使用了etcd来存放我们之前比喻中的帽子。只是这个帽子通过API-Server演化成了k8s的object;go-client中有两种实现,一种是configmap,另外一种是endpoint。而锁的属性(更新时间、持有者)通过k8s object的annotation字段存放起来;该 annotation 的 key 为 control-plane.alpha.kubernetes.io/leader(如下)。

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@ljchen ~]# kcs get endpoints kube-controller-manager -o yaml
apiVersion: v1
kind: Endpoints
metadata:
annotations:
control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"dev-7","leaseDurationSeconds":15,"acquireTime":"2018-05-22T09:54:30Z","renewTime":"2018-07-29T07:06:17Z","leaderTransitions":0}'
creationTimestamp: 2018-05-22T09:54:56Z
name: kube-controller-manager
namespace: kube-system
resourceVersion: "22700552"
selfLink: /api/v1/namespaces/kube-system/endpoints/kube-controller-manager
uid: 2e879038-5da6-11e8-9dac-00163e0f7d95
subsets: null

基于consul的选主

consul 是一个伟大的产品,大家都熟知它被用来实现服务的注册与发现、配置中心,统一下发配置等。

除了这些大家都熟知的功能,consul还有一个类似于redis超时时间的功能。即consul中的key可以绑定一个session,这个session有TTL,当session的持有者在TTL超时之前不去续命,consul就会自动释放session持有者对key的独占权。另外,consul基于long polling的实时watch机制也决定了它天然适合用来实现锁。

基本原理如下:

  1. 客户端创建session,并试着去获取锁;
  2. 如果获取锁成功,按照TTL/2的周期去续命;
  3. 如果获取锁失败,开始watch锁;
  4. 整个过程,任何一个环节报错,就重新开始一遍流程;当失败次数大于指定的次数,退出选举(可能consul无法连接或其他外部异常)。

代码实现大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
const (
LeaderElectionPathFmt = "lock/%s/leader"
)

type LeaderElection struct {
Consul *Consul
TTL time.Duration
Callback func(leader bool)
key string
}

func (l *LeaderElection) Run() {
var (
identity = Utils{}.GetMyIPAddr()
retryPeriod = time.Second * 5
maxAttempt = 30
attempt = 0
kv = l.Consul.client.KV()
session = l.Consul.client.Session()
)

l.key = fmt.Sprintf(LeaderElectionPathFmt, Utils{}.GetAppName())
callback := func(leader bool) {
GetRoleInst().SetRole(leader)
l.Callback(leader)
}

se := &consul.SessionEntry{
Name: identity,
TTL: "10s",
LockDelay: time.Nanosecond,
}

for {
// check retry times
if attempt > maxAttempt {
panic("Run retry times reach Max failed count.")
}

// create new session
sessionId, _, err := session.CreateNoChecks(se, nil)
if err != nil {

logs.Info("create session err: %v, retry after 5 second.", err)
time.Sleep(retryPeriod)
attempt ++

continue
}

logs.Info("session sessionId:", sessionId)
p := &consul.KVPair{
Key: l.key,
Value: []byte(identity),
Session: sessionId,
}

// try to acquire lock
locked, _, err := kv.Acquire(p, nil)
if err != nil {

logs.Info("acquire err: %v, retry after 5 second.", err)
time.Sleep(retryPeriod)
attempt ++

continue
}

// unlocked
if !locked {

callback(false)
respChan := l.Consul.WatchKey(l.key, nil)

select {
case ret := <-respChan:
if ret.Error != nil {
logs.Info("watch key err: %v, retry after 5 second.", err)
time.Sleep(retryPeriod)

} else {
logs.Info("leader released, it's time to election lock!")
}
}

// locked
} else {

callback(true)
// RenewPeriodic 是一个阻塞函数
err := session.RenewPeriodic(se.TTL, sessionId, nil, nil)
utils.Display("err:", err)

callback(false)
}

attempt = 0
}
}

0%